package cn.doitedu.hbase.hfilemr

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{ConnectionFactory, TableDescriptorBuilder}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableOutputFormat}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession

/**
  * @date: 2019/7/17
  * @site: www.doitedu.cn
  * @author: hunter.d 涛哥
  * @qq: 657270652
  * @description:
  */
object HFileDemo {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")


    val spark = SparkSession.builder().master("local").appName("").getOrCreate()
    import spark.implicits._


    /**
      * 参数设置
      */
    val tableName = TableName.valueOf("x")
    val conf = HBaseConfiguration.create();
    conf.set("hadoop.user.name","root")
    val td = TableDescriptorBuilder.newBuilder(tableName).build()

    conf.set(TableOutputFormat.OUTPUT_TABLE, "x")
    conf.set("hbase.zookeeper.quorum","spark01:2181,spark02:2181,spark03:2181")
    lazy val job = Job.getInstance(conf)

    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])

    HFileOutputFormat2.configureIncrementalLoadMap(job, td)


    /**
      * 数据处理
      */
    val ds = spark.createDataset(Seq("b,1", "a,2", "c,3"))
    val rdd = ds.rdd.map(line => {
      val arr = line.split(",")
      val k = arr(0).getBytes()
      val v = arr(1).getBytes()
      val kv = new KeyValue(k,"f".getBytes(), "c".getBytes(), v)

      (new ImmutableBytesWritable(k), kv)
    }
    )

    rdd.take(10).foreach(println)


    /**
      * 生成HFile
      */
    rdd.saveAsNewAPIHadoopFile("hdfs://spark01:8020/tmp/x2", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration())


    /**
      * 导入Hfile到hbase
      */
    val loadIncrementalHFiles = new LoadIncrementalHFiles(conf)
    val conn = ConnectionFactory.createConnection(conf)
    val table = conn.getTable(tableName)
    val admin = conn.getAdmin
    val regionLocator = conn.getRegionLocator(tableName)

    loadIncrementalHFiles.doBulkLoad(new Path("hdfs://spark01:8020/tmp/x2"), admin, table, regionLocator)


    spark.close()


  }
}
